package com.lwl.java8.chapter11;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.junit.Test;

/**
 * Created by liwenlong on 2017/12/21 13:38 <br/>
 */
public class CompletableFutureTest {

	@Test
	public void futureTest() {
		ExecutorService executorService = Executors.newCachedThreadPool();
		Future<String> future = executorService.submit(new Callable<String>() {
			@Override
			public String call() throws Exception {
				System.out.println("耗时操作...");
				Thread.sleep(2000);
				return "耗时操作完成";
			}
		});

		System.out.println("做其他操作");

		try {
			String s = future.get(3, TimeUnit.SECONDS);
			System.out.println(s);
		} catch (InterruptedException e) {
			System.out.println("当前线程在等待过程中被中断");
			e.printStackTrace();
		} catch (ExecutionException e) {
			System.out.println("计算抛出一个异常");
			e.printStackTrace();
		} catch (TimeoutException e) {
			System.out.println("在Future对象完成之前超过已过期");
			e.printStackTrace();
		}
	}

	@Test
	public void asyncTest() {
		Shop shop = new Shop("BestShop");
		long start = System.currentTimeMillis();
		// 查询商店，试图取得商品的价格
		Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
		long invocationTime = ((System.currentTimeMillis() - start));
		System.out.println("Invocation returned after " + invocationTime + " msecs");
		// 执行更多任务,比如查询其他商店
		System.out.println("do something else");
		try {
			// 从 Future 对象中读取价格，如果价格未知，会发生阻塞
			Double price = futurePrice.get();
			System.out.printf("Price is %.2f%n", price);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
		long retrievalTime = ((System.currentTimeMillis() - start));
		System.out.println("Price returned after " + retrievalTime + " msecs");

	}

	List<Shop> shops = Arrays.asList(new Shop("BestPrice"), new Shop("LetsSaveBig"), new Shop("MyFavoriteShop"),
			new Shop("MyFavoriteShop2"), new Shop("BuyItAll"));

	@Test
	public void completableTest() {
		List<Shop> shops = Arrays.asList(new Shop("aaa"), new Shop("bbb"), new Shop("ccc"), new Shop("ddd"),
				new Shop("eee"), new Shop("fff"), new Shop("ggg"));
		long start = System.currentTimeMillis();
		ExecutorService executorService = Executors.newFixedThreadPool(Math.min(shops.size(), 100),
				new ThreadFactory() {
					@Override
					public Thread newThread(Runnable r) {
						Thread t = new Thread(r);
						t.setDaemon(true);
						return t;
					}
				});
		List<CompletableFuture<String>> priceFutures = shops.stream().map(shop -> CompletableFuture.supplyAsync(() -> {
			System.out.println(Thread.currentThread().getName());
			return shop.getName() + " price is " + shop.getPrice("aaa");
		}, executorService)).collect(Collectors.toList());

		List<String> collect = priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());

		System.out.println(System.currentTimeMillis() - start);
		for (String s : collect) {
			System.out.println(s);
		}
	}

	public List<String> findPrices(String product) {
		// return shops.stream() //顺序流计算
		return shops.parallelStream() // 并行流计算
				.map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
				.collect(Collectors.toList());
	}

	private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
		public Thread newThread(Runnable r) {
			Thread t = new Thread(r);
			t.setDaemon(true);
			return t;
		}
	});

	public List<String> findPrices2(String product) {
		// return shops.stream() //顺序流计算
		List<CompletableFuture<String>> priceFutures = shops.stream()
				.map(shop -> CompletableFuture.supplyAsync(
						() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product), executor)))
				.collect(Collectors.toList());

		return priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
	}

	@Test
	public void streamTest() {
		long start = System.nanoTime();
		// System.out.println(findPrices("myPhone27S"));
		System.out.println(findPrices2("myPhone27S"));
		long duration = (System.nanoTime() - start) / 1_000_000;
		System.out.println("Done in " + duration + " msecs");
	}

	/**
	 * 以最简单的方式实现使用 Discount 服务的 findPrices 方法
	 *
	 * @param product
	 * @return
	 */
	private List<String> findPrices3(String product) {
		return shops.stream().map(shop -> shop.getPrice(product)).map(Quote::parse).map(Discount::applyDiscount)
				.collect(Collectors.toList());
	}

	@Test
	public void findPrices3Test() {
		long start = System.currentTimeMillis();
		List<String> findPrices3 = findPrices3("apple");
		findPrices3.forEach(System.out::println);
		System.out.println(System.currentTimeMillis() - start);
	}

	@Test
	public void test() {

		ExecutorService executor = Executors.newFixedThreadPool(10);
		List<CompletableFuture> futures = new ArrayList<>(10);
		for (int i = 0; i < 10; i++) {
			CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
				try {
					System.out.println(Thread.currentThread().getName());
					Thread.sleep(2000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				return "";
			}, executor);
			futures.add(future);
		}
		futures.stream().forEach((item) -> item.join());
	}

}
